{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "_M1WAAa_N3MO"
   },
   "source": [
    "## 2. Working with files\n",
    "\n",
    "<img src=\"https://tuplex.cs.brown.edu/_static/img/logo.png\" width=\"128px\" style=\"float: right;\" />\n",
    "\n",
    "In the 2nd part of the Tuplex intro series, we'll take a look at how to work with CSV and text files."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "ZOobvZcVO2_H"
   },
   "source": [
    "### 2.1 Basic IO - Reading CSV files\n",
    "To read in a csv file, Tuplex provides an API function `csv`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "IAGEc-okO2im"
   },
   "outputs": [],
   "source": [
    "import tuplex\n",
    "\n",
    "c = tuplex.Context({'tuplex.redirectToPythonLogging':False})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "zdb8DfszEzC8"
   },
   "source": [
    "In the following cells, we use some sample data from Google Colab. We can simply load it into Tuplex using the `csv` command."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "MJz8q4Tw9Bsy"
   },
   "outputs": [],
   "source": [
    "ds = c.csv('sample_data/california_housing_train.csv')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "GXauZfPs9H2i",
    "outputId": "42ea3904-793a-4389-c35f-40d8fb11ee0e"
   },
   "outputs": [],
   "source": [
    "ds.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "XdGZgph6E8kz"
   },
   "source": [
    "Without any further information, Tuplex automatically deduces types for each column. In order to check what types Tuplex deduced, we can use the `columns` and `types` properties of a Tuplex dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "EsosZ2csE71x",
    "outputId": "ebe514f1-a085-4885-bed9-2f0554f6da75"
   },
   "outputs": [],
   "source": [
    "columns = ds.columns\n",
    "types = ds.types\n",
    "\n",
    "# print out as nicely formatted dictionary\n",
    "dict(zip(columns, types))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "ebt3udXFFctc"
   },
   "source": [
    "Sometimes however, it may be desirable to assign specific types to individual columns. Luckily, Tuplex provides a mechanism for this as well:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "3_kkRMKfFb7W",
    "outputId": "66ea527b-d6c1-4718-f6a1-4a8564928d5d"
   },
   "outputs": [],
   "source": [
    "c.csv('sample_data/california_housing_train.csv',  type_hints={'longitude' : float, 'latitude' : str}).show(4)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "TZgHfZFYdGTP"
   },
   "source": [
    "Let's say we now want to create a file containing only data entries where the `housing_median_age` is larger than `50`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "Ato7fqLjc5H1"
   },
   "outputs": [],
   "source": [
    "ds.filter(lambda r: r['housing_median_age'] > 50).tocsv('lt50.csv', num_parts=0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "PCgh3TkJd-9Y"
   },
   "source": [
    "In order to speedup data output, Tuplex by default uses multiple threads to create multiple output parts."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "6YNy969JdfBG",
    "outputId": "f2e53ea1-fdd6-4a52-914e-5dc82d7a6ea4"
   },
   "outputs": [],
   "source": [
    "!head lt50.part0.csv"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "ETBMyA2GeGVA"
   },
   "source": [
    "Besides CSV files, Tuplex also has experimental support to read/write [ORC files](https://https://orc.apache.org/), which may be a more space efficient solution depending on the data and workload."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "8W97qlM9eBkL"
   },
   "outputs": [],
   "source": [
    "ds.toorc('lt50.orc')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "cSCTtDPAGwM6"
   },
   "source": [
    "Similarly, the orc files can be read using the `orc` command."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "ACpfbMz5GS4Q",
    "outputId": "ee663fda-187f-43bc-d17e-c7e20ad19a6d"
   },
   "outputs": [],
   "source": [
    "c.orc('lt50.part0.orc').show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "3sJ05Og2rubO"
   },
   "source": [
    "## 2.2 Working with larger files\n",
    "Naturally, the benefit of Tuplex's compilation comes into play when working with larger files. To demonstrate this, let's assume we want to work with the 311 original data. A subset of this (1GB, ~212MB to download) can be downloaded via the following command"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "GZ5ahr4zryaP",
    "outputId": "3a570a08-7e6c-4dc5-ccd9-3b68c6aede0e"
   },
   "outputs": [],
   "source": [
    "!gdown https://drive.google.com/uc?id=18e2GyoQKLnQ2_uaUcaSOsLRlIT-7tqpN && tar xf 311_subset.tar.gz && mv 311_subset.csv sample_data/"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "psV4--h7w5vK"
   },
   "source": [
    "Next, let's create a new context with more memory to process the larger file. You can still reuse the old one albeit at the cost of incurring a lot of disk swapping. Therefore, we delete the old context to free up the space."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "WZQeKTxF3hvH"
   },
   "outputs": [],
   "source": [
    "del c"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "MEiheSMPFQmy",
    "outputId": "899dd3c1-32ad-489d-fef9-5e12b9e71c1e"
   },
   "outputs": [],
   "source": [
    "!head sample_data/311_subset.csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "gO0jOErMxBYq",
    "outputId": "3e7f23e7-49b7-4fd8-8882-4dfe297b6127"
   },
   "outputs": [],
   "source": [
    "c = tuplex.Context({'tuplex.redirectToPythonLogging':True, 'tuplex.executorMemory':'3G', 'tuplex.driverMemory':'3G'})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "QdOhBdtYkyPn"
   },
   "source": [
    "Again, we can use Tuplex's autodetection feature to load the file and assign meaningful default types."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "3T56PhtlvGaL",
    "outputId": "e0b5530e-42d0-42cc-9dc0-241da1fa5838"
   },
   "outputs": [],
   "source": [
    "ds = c.csv('sample_data/311_subset.csv')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "sMIkneXZvZZY",
    "outputId": "4a4f63e7-669c-49a2-c0b4-04493603a28c"
   },
   "outputs": [],
   "source": [
    "dict(zip(ds.columns, ds.types))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "81tBWIvZtW0x"
   },
   "source": [
    "Executing a simple query on the input data creates a logical plan under the hood, which then gets optimized into a physical plan together with auto-generated efficient code that gets lowered ultimately to native code optimized for the machine it is executed on."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "KsSHOrfIvhIO",
    "outputId": "16fec20d-1313-48ba-b8b4-5cdd8dbea1e0"
   },
   "outputs": [],
   "source": [
    "ds.selectColumns(['Unique Key']).show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "uPqLcSjJt80V"
   },
   "source": [
    "As for every operation, we can retrieve help using Python's builtin documentation featue."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "kgnGE2OYntU4",
    "outputId": "e187f6cd-3f8d-44aa-a58c-a3806a6c074e"
   },
   "outputs": [],
   "source": [
    "help(ds.selectColumns)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "foF-WYEnuKbm"
   },
   "source": [
    "I.e., when looking up the semantics of the `selectColumns` operation, it's also possible to use integers instead of strings to select columns for more flexibility."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "G07YcJeGuU7K",
    "outputId": "706413be-67d7-472b-b45f-857d81c69b57"
   },
   "outputs": [],
   "source": [
    "ds.selectColumns([0, 1]).show(3)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "O0K0POANzMGJ"
   },
   "source": [
    "Let's say, we want to use a slightly more complicated pipeline now. As an initial step, let's first investigate what kind ofcomplaint types there are. To find the corresponding column, we can use the meta-data associated with a dataset and then design a first, exploratory query."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "_LXRLILHzQd5",
    "outputId": "2e3232a2-5e8b-47ca-917e-45e567fa24eb"
   },
   "outputs": [],
   "source": [
    "def print_table(arr, break_after=5):\n",
    "    for i in range(len(arr) // break_after +1):\n",
    "        print(' | '.join(arr[i * break_after:(i +1)* break_after]))\n",
    "\n",
    "print_table(ds.columns)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "aW2AjU9r0Jqt",
    "outputId": "7d6696ab-7721-4735-9523-800d8f74853c"
   },
   "outputs": [],
   "source": [
    "complaint_types = ds.selectColumns(['Complaint Type']).unique().collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "JaN8v0Zu0hpT",
    "outputId": "6389c068-37e5-4d5d-d9e8-96ad9edb5b8d"
   },
   "outputs": [],
   "source": [
    "print(complaint_types)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "rV8aG_rV0lAE"
   },
   "source": [
    "Looking at the data, we see that there are some complaints regarding mosquitoes. Likely, because it gets quite hot and humid in summer in New York City! Can the data back this up?\n",
    "\n",
    "To find out, let's plot the number of mosquito complaints per month for the last year. A helpful function for aggregating the results is `aggregateByKey`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "l-5hyAme0xga",
    "outputId": "42ef85f7-340d-4396-9e4a-fdada8211861"
   },
   "outputs": [],
   "source": [
    "help(ds.aggregateByKey)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "aYeMEv-81nQT"
   },
   "source": [
    "Next, let's use a UDF to extract the month and year of the complaint and limit the search to complain types so Tuplex automatically processes fewer rows."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "gvPtyBDE1m1_",
    "outputId": "a7a51af1-3295-401c-fd38-d649f8d02cf2"
   },
   "outputs": [],
   "source": [
    "ds.selectColumns(['Created Date']).show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "A3sguZZ30xiq",
    "outputId": "9768489f-7858-46e5-827b-6a3dfe918d1a"
   },
   "outputs": [],
   "source": [
    "year_to_investigate = 2019\n",
    "\n",
    "def extract_month(row):\n",
    "    date = row['Created Date']\n",
    "    date = date[:date.find(' ')]\n",
    "    return int(date.split('/')[0])\n",
    "\n",
    "def extract_year(row):\n",
    "    date = row['Created Date']\n",
    "    date = date[:date.find(' ')]\n",
    "    return int(date.split('/')[-1])\n",
    "\n",
    "ds2 = ds.withColumn('Month', extract_month) \\\n",
    "  .withColumn('Year', extract_year) \\\n",
    "  .filter(lambda row: 'Mosquito' in row['Complaint Type']) \\\n",
    "  .filter(lambda row: row['Year'] == year_to_investigate) \\\n",
    "  .selectColumns(['Month', 'Year', 'Complaint Type'])\n",
    "\n",
    "\n",
    "ds2.show(5)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "wENeFNWEejM8"
   },
   "source": [
    "We can now use the aggregateByKey function to count the number of mosquito complaints per month in 2019."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "joNXIjhSeh0i",
    "outputId": "2865828b-2cfa-4d10-c6bd-a6a902d8de0d"
   },
   "outputs": [],
   "source": [
    "def combine_udf(a, b):\n",
    "    return a + b\n",
    "\n",
    "def aggregate_udf(agg, row):\n",
    "    return agg + 1\n",
    "\n",
    "ds2.aggregateByKey(combine_udf, aggregate_udf, 0, [\"Month\"]).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "kY_4W26xhmYZ"
   },
   "source": [
    "Yet, it seems that mosquito complaints are actually not that common. In total there are 4 complaints for the whole year, of which 3 are in December. Thus we actually can't draw with such little support any meaningful conclusions about mosquitos in NYC from the 311 dataset.\n",
    "\n",
    "Let's step back and check actually, what kind of complaint is actually the most common:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "RSV4XLHgh_gC",
    "outputId": "2b85475c-3538-4550-e3f7-a81308c9d4b7"
   },
   "outputs": [],
   "source": [
    "data = ds.aggregateByKey(combine_udf, aggregate_udf, 0, [\"Complaint Type\"]).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "CnkMPDVOXZmo"
   },
   "source": [
    "To see what the most common complaint is, let's sort the output:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "u7JG8-TpXYsS",
    "outputId": "9c00573f-1435-40bb-df05-8571f5d27361"
   },
   "outputs": [],
   "source": [
    "data = sorted(data, key=lambda x: -x[1])\n",
    "\n",
    "data[:5]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "pbHtmBoaYqQD"
   },
   "source": [
    "As we can see,  is the most common complaint and with a little more code, a plot can be generated - can you do it?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "7_iruiYiiWmd"
   },
   "source": [
    "(c) 2017 - 2022 Tuplex team"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "UyneYgH5XwQz"
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "colab": {
   "collapsed_sections": [],
   "provenance": []
  },
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.14"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
